<?php
namespace app;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Log;

class MqProducer
{
    public static function pushMessage($data)
    {
        $param = config('rabbitmq.AMQP');
        $amqpDetail = config('rabbitmq.email_queue');
        $connection = new AMQPStreamConnection(
            $param['host'],
            $param['port'],
            $param['login'],
            $param['password'],
            $param['vhost']
        );
        $channel = $connection->channel();
        /*
             name: $queue  创建队列
             passive: false
             持久durable: true // //队列将在服务器重启后继续存在
             互斥exclusive: false // 队列可以通过其他渠道访问
             auto_delete: false 通道关闭后，队列不会被删除
         */
        $channel->queue_declare($amqpDetail['queue_name'], false, true, false, false);

        /*
            name: $exchange  创建交换机
            type: direct   直连方式
            passive: false
            durable: true  持久// 交换器将在服务器重启后继续存在
            auto_delete: false //一旦通道关闭，交换器将不会被删除。
        */
        $channel->exchange_declare($amqpDetail['exchange_name'], 'direct', false, true, false);

        /*
             $messageBody:消息体
             content_type:消息的类型 可以不指定
             delivery_mode:消息持久化最关键的参数
             AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1;
             AMQPMessage::DELIVERY_MODE_PERSISTENT = 2;
         */
        $messageBody = $data;
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, $amqpDetail['exchange_name'],$amqpDetail['route_key']);
        $channel->close();
        $connection->close();
    }

    public static  function fanout($data){
        $param      = config('rabbitmq.AMQP');
        $amqpDetail = config('rabbitmq.fanout_queue');
        $connection = new AMQPStreamConnection(
            $param['host'], //主机名
            $param['port'], //端口号
            $param['login'], //rabbitmq用户名
            $param['password'], //rabbitmq密码
            $param['vhost']   //虚拟主机【起到消息隔离的作用】，在此不详解
        );

        //将要发送数据变为json字符串
        $messageBody = json_encode($data);

        //连接信道
        $channel = $connection->channel();

        /*
         * 流量控制 Specifies QoS
         *      消费者在开启acknowledge的情况下，对接收到的消息需要异步对消息进行确认
         *      由于消费者自身处理能力有限，从rabbitmq获取一定数量的消息后，希望rabbitmq不再将队列中的消息推送过来，
         *      当对消息处理完后（即对消息进行了ack，并且有能力处理更多的消息）再接收来自队列的消息
         * @param int $prefetch_size   最大unacked消息的字节数
         * @param int $prefetch_count  最大unacked消息的条数
         * @param bool $a_global       上述限制的限定对象，false限制单个消费者，true限制整个通道
         * @return mixed
         */
        $channel->basic_qos(0, 1, false);

        /*
         * 创建交换机(Exchange)
         * name: vckai_exchange// 交换机名称
         * type: direct        // 交换机类型，分别为direct/fanout/topic，参考另外文章的Exchange Type说明。
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: false      // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         */
        $channel->exchange_declare($amqpDetail['exchange_name'], $amqpDetail['exchange_type'], false, true, false);

        /*
         * 创建AMQP消息类型
         * $messageBody:消息体
         * delivery_mode 消息是否持久化
         *      AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1; 不持久化
         *      AMQPMessage::DELIVERY_MODE_PERSISTENT = 2; 持久化
         */
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        /*
         * 发送消息
         * msg       // AMQP消息内容
         * exchange  // 交换机名称
         */
        $channel->basic_publish($message, $amqpDetail['exchange_name']);
        $channel->close();
        $connection->close();
    }


    public static  function delayedMessage($data,$time = 5){
        $param      = config('rabbitmq.AMQP');
        $amqpDetail = config('rabbitmq.order_queue');
        $connection = new AMQPStreamConnection(
            $param['host'], //主机名
            $param['port'], //端口号
            $param['login'], //rabbitmq用户名
            $param['password'], //rabbitmq密码
            $param['vhost']   //虚拟主机【起到消息隔离的作用】，在此不详解
        );

        //将要发送数据变为json字符串
        $messageBody = json_encode($data);

        //连接信道
        $channel = $connection->channel();

        /*
         * 流量控制 Specifies QoS
         *      消费者在开启acknowledge的情况下，对接收到的消息需要异步对消息进行确认
         *      由于消费者自身处理能力有限，从rabbitmq获取一定数量的消息后，希望rabbitmq不再将队列中的消息推送过来，
         *      当对消息处理完后（即对消息进行了ack，并且有能力处理更多的消息）再接收来自队列的消息
         * @param int $prefetch_size   最大unacked消息的字节数
         * @param int $prefetch_count  最大unacked消息的条数
         * @param bool $a_global       上述限制的限定对象，false限制单个消费者，true限制整个通道
         * @return mixed
         */
        $channel->basic_qos(0, 1, false);

        /*
         * 创建交换机(Exchange)
         * name: vckai_exchange// 交换机名称
         * type: direct        // 交换机类型，分别为direct/fanout/topic，参考另外文章的Exchange Type说明。
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: false      // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         */
        $channel->exchange_declare($amqpDetail['exchange_name'],
            $amqpDetail['exchange_type'],
            false,
            true,
            false,
            false,
            false,
            new \PhpAmqpLib\Wire\AMQPTable([
                'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::DIRECT
            ])
        );

        /*
         * 创建AMQP消息类型
         * $messageBody:消息体
         * delivery_mode 消息是否持久化
         *      AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1; 不持久化
         *      AMQPMessage::DELIVERY_MODE_PERSISTENT = 2; 持久化
         */

        $headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => $time*1000]);
        $message = new AMQPMessage($messageBody, ['delivery_mode' =>  AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        $message->set('application_headers', $headers);
        $channel->basic_publish($message, $amqpDetail['exchange_name']);

//        $datetime = date('Y/m/d H:i:s');
//        echo "成功发送延迟消息 : {$messageBody} , {$datetime} \n";

        /*
         * 发送消息
         * msg       // AMQP消息内容
         * exchange  // 交换机名称
         */
        $channel->close();
        $connection->close();
    }
}
